/*
 * Copyright 2008-2023 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.batch.core.repository.dao;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

import com.fasterxml.jackson.annotation.JacksonAnnotation;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.core.type.WritableTypeId;
import com.fasterxml.jackson.databind.DatabindContext;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.cfg.MapperConfig;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.jsontype.PolymorphicTypeValidator;
import com.fasterxml.jackson.databind.jsontype.TypeIdResolver;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;
import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.repository.ExecutionContextSerializer;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.Assert;

import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;

/**
 * Implementation that uses Jackson2 to provide (de)serialization.
 *
 * By default, this implementation trusts a limited set of classes to be deserialized from
 * the execution context. If a class is not trusted by default and is safe to deserialize,
 * you can add it to the base set of trusted classes at
 * {@link Jackson2ExecutionContextStringSerializer construction time} or provide an
 * explicit mapping using Jackson annotations, as shown in the following example:
 *
 * <pre class="code">
 *     &#064;JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
 *     public class MyTrustedType implements Serializable {
 *
 *     }
 * </pre>
 *
 * It is also possible to provide a custom {@link ObjectMapper} with a mixin for the
 * trusted type:
 *
 * <pre class="code">
 *     ObjectMapper objectMapper = new ObjectMapper();
 *     objectMapper.addMixIn(MyTrustedType.class, Object.class);
 *     Jackson2ExecutionContextStringSerializer serializer = new Jackson2ExecutionContextStringSerializer();
 *     serializer.setObjectMapper(objectMapper);
 *     // register serializer in JobRepositoryFactoryBean
 * </pre>
 *
 * If the (de)serialization is only done by a trusted source, you can also enable default
 * typing:
 *
 * <pre class="code">
 *     PolymorphicTypeValidator polymorphicTypeValidator = .. // configure your trusted PolymorphicTypeValidator
 *     ObjectMapper objectMapper = new ObjectMapper();
 *     objectMapper.activateDefaultTyping(polymorphicTypeValidator);
 *     Jackson2ExecutionContextStringSerializer serializer = new Jackson2ExecutionContextStringSerializer();
 *     serializer.setObjectMapper(objectMapper);
 *     // register serializer in JobRepositoryFactoryBean
 * </pre>
 *
 * @author Marten Deinum
 * @author Mahmoud Ben Hassine
 * @since 3.0.7
 * @see ExecutionContextSerializer
 */
public class Jackson2ExecutionContextStringSerializer implements ExecutionContextSerializer {

	private static final String IDENTIFYING_KEY_NAME = "identifying";

	private static final String TYPE_KEY_NAME = "type";

	private static final String VALUE_KEY_NAME = "value";

	private ObjectMapper objectMapper;

	/**
	 * Create a new {@link Jackson2ExecutionContextStringSerializer}.
	 * @param trustedClassNames fully qualified names of classes that are safe to
	 * deserialize from the execution context and which should be added to the default set
	 * of trusted classes.
	 */
	public Jackson2ExecutionContextStringSerializer(String... trustedClassNames) {
		this.objectMapper = JsonMapper.builder()
			.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false)
			.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true)
			.configure(MapperFeature.BLOCK_UNSAFE_POLYMORPHIC_BASE_TYPES, true)
			.setDefaultTyping(createTrustedDefaultTyping(trustedClassNames))
			.addModule(new JobParametersModule())
			.addModule(new JavaTimeModule())
			.build();
	}

	public void setObjectMapper(ObjectMapper objectMapper) {
		Assert.notNull(objectMapper, "ObjectMapper must not be null");
		this.objectMapper = objectMapper.copy();
		this.objectMapper.registerModule(new JobParametersModule());
	}

	@Override
	public Map<String, Object> deserialize(InputStream in) throws IOException {

		TypeReference<HashMap<String, Object>> typeRef = new TypeReference<>() {
		};
		return objectMapper.readValue(in, typeRef);
	}

	@Override
	public void serialize(Map<String, Object> context, OutputStream out) throws IOException {

		Assert.notNull(context, "A context is required");
		Assert.notNull(out, "An OutputStream is required");

		objectMapper.writeValue(out, context);
	}

	// BATCH-2680
	/**
	 * Custom Jackson module to support {@link JobParameter} and {@link JobParameters}
	 * serialization and deserialization.
	 */
	private class JobParametersModule extends SimpleModule {

		private static final long serialVersionUID = 1L;

		private JobParametersModule() {
			super("Job parameters module");
			setMixInAnnotation(JobParameters.class, JobParametersMixIn.class);
			addDeserializer(JobParameter.class, new JobParameterDeserializer());
			addSerializer(JobParameter.class, new JobParameterSerializer(JobParameter.class));
		}

		private abstract class JobParametersMixIn {

			@JsonIgnore
			abstract boolean isEmpty();

			@JsonIgnore
			abstract Map<String, JobParameter<?>> getIdentifyingParameters();

		}

		private class JobParameterSerializer extends StdSerializer<JobParameter> {

			protected JobParameterSerializer(Class<JobParameter> type) {
				super(type);
			}

			@Override
			public void serializeWithType(JobParameter value, JsonGenerator gen, SerializerProvider provider,
					TypeSerializer typeSer) throws IOException {
				WritableTypeId typeId = typeSer.typeId(value, START_OBJECT);
				typeSer.writeTypePrefix(gen, typeId);
				serialize(value, gen, provider);
				typeSer.writeTypeSuffix(gen, typeId);
			}

			@Override
			public void serialize(JobParameter jobParameter, JsonGenerator jsonGenerator,
					SerializerProvider serializerProvider) throws IOException {
				jsonGenerator.writeFieldName(VALUE_KEY_NAME);
				jsonGenerator.writeObject(jobParameter.getValue());
				jsonGenerator.writeFieldName(TYPE_KEY_NAME);
				jsonGenerator.writeString(jobParameter.getType().getName());
				jsonGenerator.writeFieldName(IDENTIFYING_KEY_NAME);
				jsonGenerator.writeObject(jobParameter.isIdentifying());
			}

		}

		private class JobParameterDeserializer extends StdDeserializer<JobParameter> {

			private static final long serialVersionUID = 1L;

			JobParameterDeserializer() {
				super(JobParameter.class);
			}

			@SuppressWarnings(value = { "unchecked", "rawtypes" })
			@Override
			public JobParameter deserialize(JsonParser parser, DeserializationContext context) throws IOException {
				JsonNode node = parser.readValueAsTree();
				boolean identifying = node.get(IDENTIFYING_KEY_NAME).asBoolean();
				String type = node.get(TYPE_KEY_NAME).asText();
				JsonNode value = node.get(VALUE_KEY_NAME);
				try {
					Class<?> parameterType = Class.forName(type);
					Object typedValue = objectMapper.convertValue(value, parameterType);
					return new JobParameter(typedValue, parameterType, identifying);
				}
				catch (ClassNotFoundException e) {
					throw new RuntimeException("Unable to deserialize job parameter " + value.asText(), e);
				}
			}

		}

	}

	/**
	 * Creates a TypeResolverBuilder that checks if a type is trusted.
	 * @return a TypeResolverBuilder that checks if a type is trusted.
	 * @param trustedClassNames array of fully qualified trusted class names
	 */
	private static TypeResolverBuilder<? extends TypeResolverBuilder> createTrustedDefaultTyping(
			String[] trustedClassNames) {
		TypeResolverBuilder<StdTypeResolverBuilder> result = new TrustedTypeResolverBuilder(
				ObjectMapper.DefaultTyping.NON_FINAL, trustedClassNames);
		result = result.init(JsonTypeInfo.Id.CLASS, null);
		result = result.inclusion(JsonTypeInfo.As.PROPERTY);
		return result;
	}

	/**
	 * An implementation of {@link ObjectMapper.DefaultTypeResolverBuilder} that inserts
	 * an {@code allow all} {@link PolymorphicTypeValidator} and overrides the
	 * {@code TypeIdResolver}
	 *
	 * @author Rob Winch
	 */
	static class TrustedTypeResolverBuilder extends ObjectMapper.DefaultTypeResolverBuilder {

		private final String[] trustedClassNames;

		TrustedTypeResolverBuilder(ObjectMapper.DefaultTyping defaultTyping, String[] trustedClassNames) {
			super(defaultTyping,
					// we do explicit validation in the TypeIdResolver
					BasicPolymorphicTypeValidator.builder().allowIfSubType(Object.class).build());
			this.trustedClassNames = trustedClassNames != null
					? Arrays.copyOf(trustedClassNames, trustedClassNames.length) : null;
		}

		@Override
		protected TypeIdResolver idResolver(MapperConfig<?> config, JavaType baseType,
				PolymorphicTypeValidator subtypeValidator, Collection<NamedType> subtypes, boolean forSer,
				boolean forDeser) {
			TypeIdResolver result = super.idResolver(config, baseType, subtypeValidator, subtypes, forSer, forDeser);
			return new TrustedTypeIdResolver(result, this.trustedClassNames);
		}

	}

	/**
	 * A {@link TypeIdResolver} that delegates to an existing implementation and throws an
	 * IllegalStateException if the class being looked up is not trusted, does not provide
	 * an explicit mixin, and is not annotated with Jackson mappings.
	 */
	static class TrustedTypeIdResolver implements TypeIdResolver {

		private static final Set<String> TRUSTED_CLASS_NAMES = Set.of("javax.xml.namespace.QName", "java.util.UUID",
				"java.util.ArrayList", "java.util.Arrays$ArrayList", "java.util.LinkedList",
				"java.util.Collections$EmptyList", "java.util.Collections$EmptyMap", "java.util.Collections$EmptySet",
				"java.util.Collections$UnmodifiableRandomAccessList", "java.util.Collections$UnmodifiableList",
				"java.util.Collections$UnmodifiableMap", "java.util.Collections$UnmodifiableSet",
				"java.util.Collections$SingletonList", "java.util.Collections$SingletonMap",
				"java.util.Collections$SingletonSet", "java.util.Date", "java.time.Instant", "java.time.Duration",
				"java.time.LocalDate", "java.time.LocalTime", "java.time.LocalDateTime", "java.sql.Timestamp",
				"java.net.URL", "java.util.TreeMap", "java.util.HashMap", "java.util.LinkedHashMap",
				"java.util.TreeSet", "java.util.HashSet", "java.util.LinkedHashSet", "java.lang.Boolean",
				"java.lang.Byte", "java.lang.Short", "java.lang.Integer", "java.lang.Long", "java.lang.Double",
				"java.lang.Float", "java.math.BigDecimal", "java.math.BigInteger", "java.lang.String",
				"java.lang.Character", "java.lang.CharSequence", "java.util.Properties", "[Ljava.util.Properties;",
				"org.springframework.batch.core.JobParameter", "org.springframework.batch.core.JobParameters",
				"java.util.concurrent.ConcurrentHashMap", "java.sql.Date");

		private final Set<String> trustedClassNames = new LinkedHashSet<>(TRUSTED_CLASS_NAMES);

		private final TypeIdResolver delegate;

		TrustedTypeIdResolver(TypeIdResolver delegate, String[] trustedClassNames) {
			this.delegate = delegate;
			if (trustedClassNames != null) {
				this.trustedClassNames.addAll(Arrays.asList(trustedClassNames));
			}
		}

		@Override
		public void init(JavaType baseType) {
			delegate.init(baseType);
		}

		@Override
		public String idFromValue(Object value) {
			return delegate.idFromValue(value);
		}

		@Override
		public String idFromValueAndType(Object value, Class<?> suggestedType) {
			return delegate.idFromValueAndType(value, suggestedType);
		}

		@Override
		public String idFromBaseType() {
			return delegate.idFromBaseType();
		}

		@Override
		public JavaType typeFromId(DatabindContext context, String id) throws IOException {
			DeserializationConfig config = (DeserializationConfig) context.getConfig();
			JavaType result = delegate.typeFromId(context, id);
			String className = result.getRawClass().getName();
			if (isTrusted(className)) {
				return result;
			}
			boolean isExplicitMixin = config.findMixInClassFor(result.getRawClass()) != null;
			if (isExplicitMixin) {
				return result;
			}
			Class<?> rawClass = result.getRawClass();
			JacksonAnnotation jacksonAnnotation = AnnotationUtils.findAnnotation(rawClass, JacksonAnnotation.class);
			if (jacksonAnnotation != null) {
				return result;
			}
			throw new IllegalArgumentException("The class with " + id + " and name of " + className
					+ " is not trusted. "
					+ "If you believe this class is safe to deserialize, you can add it to the base set of trusted classes "
					+ "at construction time or provide an explicit mapping using Jackson annotations or a custom ObjectMapper. "
					+ "If the serialization is only done by a trusted source, you can also enable default typing.");
		}

		private boolean isTrusted(String id) {
			return this.trustedClassNames.contains(id);
		}

		@Override
		public String getDescForKnownTypeIds() {
			return delegate.getDescForKnownTypeIds();
		}

		@Override
		public JsonTypeInfo.Id getMechanism() {
			return delegate.getMechanism();
		}

	}

}
